Glue ETL Job より Apache Iceberg を操作する
こんにちは、川田です。今回は、Glue ETL job より、Apache Iceberg の Glue Data Catalog テーブルを操作する方法を確認してみます。
結論
操作する方法としては、Glue ETL Job の Job parameters に、下記2点のパラメーターを追加する必要があります。
以降では、Spark SQL を利用して Iceberg を操作する手順を紹介しています。
環境
- Glue Version 4.0
- 利用言語は Python です
事前準備
まずは事前に検証用の環境を用意します。
S3 Bucket を作成
以下、S3 Bucket を 2 つ用意します。
$ aws s3 mb s3://raw-ap-northeast-1-cm-zunda-demo --region ap-northeast-1 # 取込ファイル配置用のバケット $ aws s3 mb s3://stage-ap-northeast-1-cm-zunda-demo --region ap-northeast-1 # Iceberg用のバケット
テスト用の取込ファイルを配置
上記で作成した raw bucket 側に、取込用の CSV ファイルを配置しておきます。
$ cat <<EOF | aws s3 cp - s3://raw-ap-northeast-1-cm-zunda-demo/source/sample-1.csv > id,user_name,prefecture,age > 0001,AAA,Tokyo,20 > 0002,BBB,Osaka,30 > 0003,CCC,Tokyo,25 > 0004,DDD,Osaka,35 > 0005,EEE,Fukuoka,40 > EOF
Glue ETL Job 用の IAM Role を作成
Glue ETL Job で利用する IAM Role を作成します。
$ aws iam create-role --role-name demo-glue-etl-job-service-role --assume-role-policy-document \ '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "glue.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }'
AWSGlueServiceRole
と AmazonS3FullAccess
の Managed Policy を付与しておきます。
$ aws iam attach-role-policy --role-name demo-glue-etl-job-service-role \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole $ aws iam attach-role-policy --role-name demo-glue-etl-job-service-role \ --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
Glue Data Catalog に、Iceberg 用のテーブルを作成
Glue Data Catalog に demo という名前のデータベースを作成します。
$ aws glue create-database --region ap-northeast-1 --database-input Name=demo
作成したデータベースに、sample という名前の Iceberg 用テーブルを作成しておきます。
$ aws glue create-table --database-name demo --region ap-northeast-1 \ --open-table-format-input IcebergInput={MetadataOperation=CREATE} \ --table-input \ '{ "Name":"sample", "StorageDescriptor": { "Columns":[ {"Name":"id", "Type":"string"}, {"Name":"user_name", "Type":"string"}, {"Name":"prefecture", "Type":"string"}, {"Name":"age", "Type":"int"} ], "Location": "s3://stage-ap-northeast-1-cm-zunda-demo/iceberg/", "Compressed": false }, "TableType": "EXTERNAL_TABLE" }'
Iceberg テーブル向けの操作
準備が整ったため、Iceberg テーブル向けの操作を Glue ETL より行います。
Glue ETL Job を作成/実行(ファイル取込処理)
下記コードの Glue ETL Job を作成・実行します。事前作業で用意したファイルを、Iceberg テーブルに取り込む処理となります。
import sys from awsglue.context import GlueContext, DynamicFrame from awsglue.job import Job from awsglue.utils import getResolvedOptions from awsglue.transforms import ApplyMapping from pyspark.context import SparkContext from pyspark.sql import DataFrame args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() gc = GlueContext(sc) job = Job(gc) job.init(args["JOB_NAME"], args) logger = gc.get_logger() dyf: DynamicFrame = gc.create_dynamic_frame.from_options( connection_type="s3", format_options={ "withHeader": True, "separator": ",", }, format="csv", connection_options={ "paths": ["s3://raw-ap-northeast-1-cm-zunda-demo/source/"], "recurse": True }, transformation_ctx="load_raw_bucket" ) dyf: DynamicFrame = ApplyMapping.apply( frame=dyf, mappings=[ ("id", "string", "id", "string"), ("user_name", "string", "user_name", "string"), ("prefecture", "string", "prefecture", "string"), ("age", "string", "age", "int") ], transformation_ctx="apply_mapping" ) df: DataFrame = dyf.toDF() gc.write_data_frame.from_catalog( frame=df, database="demo", table_name="sample" ) job.commit()
ジョブ・パラメーターには、下記を設定します。
parameter | value |
---|---|
Name | job-sink-iceberg |
IAM Role | demo-glue-etl-job-service-role ※準備作業で作成したもの |
Type | Spark |
Glue version | Glue 4.0 |
Language | Python 3 |
Worker type | G 1X |
Requested number of workers | 2 |
Job bookmark | Enable |
Number of retries | 0 |
Job timeout (minutes) | 180 |
Continuous logging | Enable logs in CloudWatch. |
Use Glue data catalog as the Hive metastore | Enable |
Job parameters | --datalake-formats と --conf を追加 ※下記参考 |
重要な点が、 Job parameters で --datalake-formats: iceberg
と --conf: xxxx
の値を渡すことです。
--conf
パラメーターの Value 値は、具体的には下記となります。spark.sql.catalog.glue_catalog.warehouse
パラメーターで指定する S3 Bucket と Path の値は、Iceberg 向けに作成した Glue Catalog テーブルで指定した S3 Bucket と Path になります。
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://<iceberg-bucket-name>/<iceberg-bucket-path>/ --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
Iceberg 向けの設定として特別な点は、この Job parameters を渡すという点だけになります。
Athena で結果確認(ファイル取込処理)
テーブル内の結果を、Athena にて確認します。
ちゃんと取り込めています。
スキーマ情報とパーティション情報。
Glue ETL Job を作成/実行(Schema/Partition Evolution)
続いて、Iceberg テーブルに area
という名前のカラムを ADD COLUMN
して ADD PARTITION
する Spark SQL のコードを実行します。
import sys from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from pyspark.context import SparkContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() gc = GlueContext(sc) spark = gc.spark_session job = Job(gc) job.init(args["JOB_NAME"], args) logger = gc.get_logger() stmt_add_column = """ ALTER TABLE glue_catalog.demo.sample ADD COLUMNS (area string) """ spark.sql(stmt_add_column) stmt_add_partition = """ ALTER TABLE glue_catalog.demo.sample ADD PARTITION FIELD area """ spark.sql(stmt_add_partition) job.commit()
ジョブ・パラメーターの値は、job 名を除き上述ジョブと同じ値になるため省略します。
Athena で確認(Schema/Partition Evolution)
結果を確認します。
スキーマ情報とパーティション情報。ちゃんと更新されています。
実際のパーティション情報。area カラムに値のあるレコードがないため、当然全て null となっています。
Glue ETL Job を作成/実行(Insert overwrite)
パーティションの設定を反映させるため、INSERT OVERWRITE コマンドにて area カラムの値を更新します。
import sys from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from pyspark.context import SparkContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() gc = GlueContext(sc) spark = gc.spark_session job = Job(gc) job.init(args["JOB_NAME"], args) logger = gc.get_logger() stmt = """ insert overwrite glue_catalog.demo.sample select id, user_name, prefecture, age, case when prefecture = 'Tokyo' then 'Kanto' when prefecture = 'Osaka' then 'Kansai' when prefecture = 'Fukuoka' then 'Kyushu' end as area from glue_catalog.demo.sample """ spark.sql(stmt) job.commit()
ジョブ・パラメーターの値は、job 名を除き上述ジョブと同じ値になるため省略します。
Athena で確認(Insert overwrite)
結果を確認します。
area カラムの値が更新されています。
パーティション化されました。
その他
投稿した内容では、Iceberg 用の config の設定を ETL ジョブのパラメーターとして渡しましたが、下記のようにコード内にコンフィグを記載する方法もあるようです。
from pyspark.context import SparkContext from pyspark.conf import SparkConf conf = SparkConf() conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://<iceberg-bucket-name>/<iceberg-bucket-path>/") conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") sc = SparkContext(conf=conf)